package me.seu.demo.controller;

import lombok.extern.slf4j.Slf4j;
import me.seu.demo.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

/**
 * kafka api me.seu.demo.test
 *
 * @author liangfeihu
 * @since 2020/3/9 12:12
 */
@Slf4j
@RestController
@RequestMapping("/kafka")
@SuppressWarnings("ALL")
public class KafkaController {
    private static final int NUM_LIMIT = 5;

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaTemplate<String, String> template;

    @RequestMapping("/msg/send")
    public String testSendMsg() {
        producer.send();
        return "success";
    }

    @GetMapping("/send/{input}")
    public String sendFoo(@PathVariable String input) {
        template.send("topic-kl", input);
        return "success kl msg";
    }

    @GetMapping("/send/dlt/{input}")
    public String sendFoo2(@PathVariable String input) {
        template.send("topic-kln", input);
        return "success dlt msg";
    }

    /**
     * 同步发送
     *
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @RequestMapping("/msg/sync")
    public String syncSendMessage() {
        for (int i = 0; i < NUM_LIMIT; i++) {
            try {
                template.send("test_kafka", i + "", "sync-msg-" + i).get();
            } catch (InterruptedException e) {
                log.error("sync send message fail [{}] ", e.getMessage(), e);
            } catch (ExecutionException e) {
                log.error("sync send message fail [{}] ", e.getMessage(), e);
            }
        }
        return "success sync";
    }

    /**
     * 异步发送
     *
     * @return
     */
    @RequestMapping("/msg/async")
    public String sendMessageAsync() {
        for (int i = 0; i < NUM_LIMIT; i++) {
            /**
             * <p>
             * SendResult: 如果消息成功写入kafka就会返回一个RecordMetaData对象;
             * result.getRecordMetadata()包含主题信息和分区信息，以及集成在分区里的偏移量。
             * 查看RecordMetaData属性字段就知道了
             * </p>
             */
            ListenableFuture<SendResult<String, String>> send = template.send("test_kafka", i + "", "async-msg-" + i);
            send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("async send message success partition [{}]", result.getRecordMetadata().partition());
                    log.info("async send message success offest[{}]", result.getRecordMetadata().offset());
                }

                @Override
                public void onFailure(Throwable ex) {
                    log.error("async send message fail [{}]", ex.getMessage(), ex);
                }
            });
        }
        return "success async";
    }

}
